-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ROCKETMQ-82: RocketMQ-Flink Integration #45
Conversation
@vongosling @zhouxinyu Do yo have time to take a look? |
@vongosling @zhouxinyu ping.. |
Any comments are welcome.. |
Will add more unit tests later. |
Added unit tests. |
Cool, let's move this PR forward. |
I'm glad to see that some users have used this patch in some scenes. But I would suggest to fork the code from official repo, not my personal. |
Also, I will update the status of integrating and SQL modules in community mailing list periodically. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some comments, please check.
rocketmq-flink/README.md
Outdated
| consumer.topic | consumer topic *Required* | null | | ||
| consumer.tag | consumer topic tag | * | | ||
| consumer.offset.reset.to | what to do when there is no initial offset on the server | latest/earliest/timestamp | | ||
| consumer.offset.from.timestamp | the timestamp when `consumer.offset.reset.to=timestamp` was set | $TIMESTAMP | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default value $TIMESTAMP and $UUID are confusing, especially the $ symbol, a short description may be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Will fix.
rocketmq-flink/pom.xml
Outdated
<developer> | ||
<id>vesense</id> | ||
<name>Xin Wang</name> | ||
<email>xinwang@apache.org</email> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not recommended to use personal user info, see: https://github.com/apache/rocketmq/blob/master/pom.xml
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove.
rocketmq-flink/pom.xml
Outdated
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | ||
<!--maven properties --> | ||
<maven.test.skip>true</maven.test.skip> | ||
<maven.javadoc.skip>true</maven.javadoc.skip> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, it's not a good practice that skip test and javadoc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix.
rocketmq-flink/pom.xml
Outdated
<!--<groupId>ch.qos.logback</groupId>--> | ||
<!--<artifactId>logback-classic</artifactId>--> | ||
<!--</exclusion>--> | ||
<!--</exclusions>--> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the exclusions section isn't necessary, just remove it and make code clean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove.
/** | ||
* RocketMqConfig for Consumer/Producer. | ||
*/ | ||
public class RocketMqConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use RocketMQConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also RocketMqUtils
-> RocketMQUtils
.
producer.send(msg, new SendCallback() { | ||
@Override | ||
public void onSuccess(SendResult sendResult) { | ||
LOG.debug("Async send message success!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about appending the message id in send result to debug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can print the SendResult
here following the way of sync sending.
// sync sending, will return a SendResult | ||
try { | ||
SendResult result = producer.send(msg); | ||
if (LOG.isDebugEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to check whether debug is enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Will remove.
// output and state update are atomic | ||
synchronized (lock) { | ||
context.collectWithTimestamp(data, msg.getBornTimestamp()); | ||
putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need use the next begin offset in other PullStatus?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. We need the nextBeginOffset
in the PullResult
to update the offset table which will be checkpointing when snapshotState
invoked .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean we also should update the offset when the pullresult is OFFSET_ILLEGAL or NO_MATCHED_MSG
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Will fix.
@zhouxinyu Addressed your comments. |
@zhouxinyu Addressed all of your comments. Please take a look again. |
Message msg = prepareMessage(input); | ||
|
||
if (batchFlushOnCheckpoint) { | ||
batchList.add(msg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add action isn't protected by the lock batchList
, is this a concurrent issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not an issue. Each RocketMQSink instance belong to a thread. We just need to sync batchList
when snapshotState
invoked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
LGTM now~ |
Thanks @zhouxinyu |
@zhouxinyu Can we merge this in? |
LGTM |
What is the purpose of the change
RocketMQ integration for Apache Flink. This module includes the RocketMQ source and sink that allows a flink job to either write messages into a topic or read from topics in a flink job.
https://issues.apache.org/jira/browse/ROCKETMQ-82
Brief changelog
Otherwise, the source doesn't provide any reliability guarantees.
withBatchFlushOnCheckpoint(true)
is set.Otherwise, the sink reliability guarantees depends on rocketmq producer's retry policy, for this case, the messages sending way is sync by default,
but you can change it by invoking
withAsync(true)
.org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema
interface.rocketmq-flink
includes general purposeKeyValueDeserializationSchema
implementations calledSimpleKeyValueDeserializationSchema
.org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema
interface.rocketmq-flink
includes general purposeKeyValueSerializationSchema
implementations calledSimpleKeyValueSerializationSchema
.org.apache.rocketmq.flink.common.selector.TopicSelector
interface.rocketmq-flink
includes general purposeTopicSelector
implementations calledDefaultTopicSelector
andSimpleTopicSelector
.Verifying this change
local & unit tests check passed ✅
apache-rat check passed ✅
checkstyle check passed ✅